package onion.mqtt.server;

import io.netty.channel.*;
import io.netty.handler.codec.mqtt.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey;
import onion.mqtt.server.dispatcher.IMqttMessageDispatcher;
import onion.mqtt.server.dispatcher.MqttMessageDispatcher;
import onion.mqtt.server.event.IMqttServerConnectListener;
import onion.mqtt.server.manager.MessageManager;
import onion.mqtt.server.manager.SessionManager;
import onion.mqtt.server.store.MessageStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;

/**
 * @author Mr, Lu
 * @developmentTeam 浙江允泽信息科技有限公司
 * @createTime 2023/12/12
 */
@ChannelHandler.Sharable
public class MqttServerInboundHandler extends SimpleChannelInboundHandler<MqttMessage> {
    static final Logger log = LoggerFactory.getLogger(MqttServerInboundHandler.class);
    private final MqttServerProcessor processor;
    private final IMqttMessageDispatcher messageDispatcher;
    private final IMqttServerConnectListener connectStatusListener;

    public MqttServerInboundHandler(MqttServerBuilder serverBuilder) {
        this.processor = new MqttServerProcessor(serverBuilder);
        this.messageDispatcher = new MqttMessageDispatcher();
        this.connectStatusListener = serverBuilder.getConnectListener();
    }

    /**
     * 处理通道读取数据，
     *
     * @param ctx
     * @param message
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MqttMessage message) throws Exception {
        // 1.判断解码是否失败，如果失败则关闭连接
        if (message.decoderResult().isFailure()) {
            // 该错误表示版本不支持
            if (message.decoderResult().cause() instanceof MqttUnacceptableProtocolVersionException) {
                log.debug("decode fail, unacceptable protocol version.");
                ctx.writeAndFlush(MqttMessageBuilders.connAck()
                        .returnCode(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION)
                        .build());
            }
            // 该错误表示clientId不合格
            if (message.decoderResult().cause() instanceof MqttIdentifierRejectedException) {
                log.debug("decode fail, identifier rejected.");
                ctx.writeAndFlush(MqttMessageBuilders.connAck()
                        .returnCode(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED)
                        .build());
            }
            ctx.close();
            return;
        }
        // 解码成功，根据不同消息类型进行处理
        switch (message.fixedHeader().messageType()) {
            case CONNECT: // 客户端连接
                processor.connect(ctx.channel(), (MqttConnectMessage) message);
                break;
            case DISCONNECT: // 客户端断开连接
                processor.disconnect(ctx.channel());
                break;
            case SUBSCRIBE: // 订阅消息报文
                processor.subscribe(ctx.channel(), (MqttSubscribeMessage) message);
                break;
            case UNSUBSCRIBE: // 取消订阅消息报文
                processor.unsubscribe(ctx.channel(), (MqttUnsubscribeMessage) message);
                break;
            case PUBLISH: // 发布消息报文，通常是收到客户端消息后进行处理
                processor.publish(ctx.channel(), (MqttPublishMessage) message);
                break;
            case PUBACK: // Qos1消息发送完成（保证交付第一部）
                processor.pubAck(ctx.channel(), message);
                break;
            case PUBREC: // 发布是否收到响应（保证交付第一部）
                processor.pubRec(ctx.channel(), message);
                break;
            case PUBREL: // 发布是否收到响应（保证交付第二部）
                processor.pubRel(ctx.channel(), message);
                break;
            case PUBCOMP: // Qos2消息发送完成（保证交互第三步）
                processor.pubComp(ctx.channel(), message);
                break;
            case PINGREQ: // 客户端心跳
                processor.pingReq(ctx.channel());
                break;
            case PINGRESP: // 服务端心跳响应
                processor.pingRes(ctx.channel());
                break;
            default:
                break;
        }
    }

    /**
     * 心跳超时处理，服务端 当读超时时 会调用这个方法，SSL的时候也会触发SslCloseCompletionEvent，SslHandshakeCompletionEvent
     *
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        String clientId = (String) ctx.channel().attr(AttributeKey.valueOf(MqttServerConst.CLIENT_ID)).get();
        log.debug("clientId: {}, userEventTriggered: {}", clientId, evt);
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            // 一段时间内没有数据接收，包括未收到心跳数据
            if (event.state() == IdleState.READER_IDLE) {
                SessionManager.getInstance().removeSession(clientId);
                this.invokeCloseChannel(ctx.channel(), clientId);
            }
        }
    }

    /**
     * 客户端异常处理，当出现 Throwable 对象才会被调用，即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        String clientId = (String) ctx.channel().attr(AttributeKey.valueOf(MqttServerConst.CLIENT_ID)).get();
        log.debug("clientId: {}, ExceptionCaught", clientId);
        if (cause instanceof IOException) {
            this.invokeCloseChannel(ctx.channel(), clientId);
        } else {
            super.exceptionCaught(ctx, cause);
        }
    }

    /**
     * 关闭连接通道
     * @param channel
     * @param clientId
     */
    private void invokeCloseChannel(Channel channel, String clientId) {
        // 发送遗嘱消息
        List<MessageStore> willMessage = MessageManager.getInstance().getWillMessageByClient(clientId);
        messageDispatcher.dispatchWillMsg(channel, willMessage);

        // 回调状态监听器
        if (connectStatusListener!= null) {
            connectStatusListener.offline(channel, clientId);
        }

        // 关闭连接通道
        channel.close().addListener((ChannelFutureListener) future -> {
            if (!future.isSuccess()) {
                log.error("channel close error ");
            }
        });
    }
}
